package com.bigfans.framework.kafka;

import com.bigfans.framework.utils.ReflectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.lang.reflect.Method;
import java.util.*;

/**
 * @author lichong
 * @create 2018-01-31 下午9:02
 **/
public class KafkaConsumerTaskManager {

    private Map<String, KafkaConsumerTask> taskPool = new HashMap<>();
    private Map<String, List<KafkaListenerBean>> listenersMap;
    private KafkaFactory factory;
    private volatile boolean running = true;
    private KafkaConsumer<String, String> consumer;

    public KafkaConsumerTaskManager(KafkaFactory factory) {
        this.factory = factory;
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                KafkaConsumerTaskManager.this.stop();
            }
        });
    }

    final class KafkaConsumerTaskManagerThread implements Runnable {
        @Override
        public void run() {
            while (running) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
                if (consumerRecords.count() > 0) {
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        KafkaConsumerTask consumerTask = taskPool.get(consumerRecord.topic());
                        if (consumerTask == null) {
                            continue;
                        }
                        consumerTask.processRecord(consumerRecord);
                    }
                }
            }
        }
    }

    public <T> void registerListeners(Iterator<T> listenersIterator) {
        while (listenersIterator.hasNext()) {
            registerListener(listenersIterator.next());
        }
    }

    public <T> void registerListener(T listenerBean) {
        List<Method> receivers = ReflectionUtils.getMethodsWithAnnotation(listenerBean.getClass(), KafkaListener.class);
        if (receivers == null) {
            return;
        }
        for (Method receiver : receivers) {
            KafkaListener annotation = receiver.getAnnotation(KafkaListener.class);
            String[] topics = annotation.topics();
            if (topics != null && topics.length != 0) {
                for (String topic : topics) {
                    KafkaListenerBean bean = new KafkaListenerBean();
                    bean.setBean(listenerBean);
                    bean.setListenerMethod(receiver);
                    bean.setId(annotation.id());
                    bean.setTopic(topic);
                    this.putListenerBean(topic, bean);
                }
            } else {
                Class<?>[] parameterTypes = receiver.getParameterTypes();
                if (parameterTypes == null || parameterTypes.length == 0) {
                    throw new RuntimeException("topic is null for kafka listener: " + receiver.getName());
                }
                String topic = parameterTypes[0].getName();
                KafkaListenerBean bean = new KafkaListenerBean();
                bean.setBean(listenerBean);
                bean.setListenerMethod(receiver);
                bean.setId(annotation.id());
                bean.setTopic(topic);
                bean.setThreadCount("".equals(annotation.threadCount()) ? null : Integer.valueOf(annotation.threadCount()));
                this.putListenerBean(topic, bean);
            }
        }
    }

    private void putListenerBean(String topic, KafkaListenerBean bean) {
        if (listenersMap == null) {
            listenersMap = new HashMap<>();
        }
        List<KafkaListenerBean> listenerBeans = null;
        if (listenersMap.containsKey(topic)) {
            listenerBeans = listenersMap.get(topic);
        } else {
            listenerBeans = new ArrayList<>();
            listenersMap.put(topic, listenerBeans);
        }
        listenerBeans.add(bean);
    }

    public void consume() {
        if (listenersMap == null || listenersMap.isEmpty()) {
            return;
        }
        try {
            consumer = factory.createConsumer();
            consumer.subscribe(new ArrayList<>(listenersMap.keySet()));
            for (Map.Entry<String, List<KafkaListenerBean>> entry : listenersMap.entrySet()) {
                KafkaConsumerTask task = new KafkaConsumerTask(entry.getKey(), entry.getValue());
                this.taskPool.put(entry.getKey(), task);
            }
            KafkaConsumerTaskManagerThread managerThread = new KafkaConsumerTaskManagerThread();
            new Thread(managerThread).start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void stop() {
        this.running = false;
        if (this.taskPool != null) {
            for (Map.Entry<String, KafkaConsumerTask> entry : taskPool.entrySet()) {
                KafkaConsumerTask task = entry.getValue();
                task.stop();
            }
        }
    }
}
